Skip to content

[fix][broker]excessive replication speed leads to error: Producer send queue is full #24189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Jul 2, 2025

Conversation

poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Apr 21, 2025

Motivation

Background

  • Replication has a limitation that limits the maximum in-flight publishing message.
    • Users can configure the quota by replicationProducerQueueSize
    • Replication has a variable named pendingMessages that records how many messages are pending to be published
    • When there is a task that fetches schemas, the replicator will pause and mark fetchSchemaInProgress as true
    • When the replicator needs to rewind the cursor, the replicator will pause and mark waitForCursorRewinding as true
  • Replication allows at most one inflight cursor reading, which was limited by havePendingRead.
  • Reolication allows multi inflight publishing

Issue: The multiple mechanisms described above can not work well

time/thread read more entries A read more entries B
1 calculate permits: got 1000 calculate permits: got 1000
2 set havePendingRead -> true
3 start reading
4 read out 1000 msgs
5 publishing is messages, and decrease permits one by one
6 set havePendingRead -> true
7 the 1000 msgs are still in-progress
8 start reading
9 read out 1000 msgs
10 publishing is messages, and decrease permits one by one
11 There are 2000 msgs in publishing, which is more than expected, get error Producer send queue is full
12 rewind cursor and trigger more read more entries, leads the situation bader

pulsar-broker 2025-03-27T22:02:44,247+0000 [BookKeeperClientWorker-OrderedExecutor-2-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentReplicator - [persistent://public/default/output-partition-0 | prod-->prod-repl] Error producing on re
mote broker
pulsar-broker org.apache.pulsar.client.api.PulsarClientException$ProducerQueueIsFullError: Producer send queue is full
pulsar-broker     at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:1055) ~[io.streamnative-pulsar-client-original-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:534) ~[io.streamnative-pulsar-client-original-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator.replicateEntries(GeoPersistentReplicator.java:191) ~[io.streamnative-pulsar-broker-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.pulsar.broker.service.persistent.PersistentReplicator.readEntriesComplete(PersistentReplicator.java:313) ~[io.streamnative-pulsar-broker-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$checkReadCompletion$2(OpReadEntry.java:180) ~[io.streamnative-managed-ledger-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:128) ~[io.streamnative-bookkeeper-common-4.17.1.1.jar:4.17.1.1]
pulsar-broker     at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:99) ~[io.streamnative-bookkeeper-common-4.17.1.1.jar:4.17.1.1]
pulsar-broker     at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.119.Final.jar:4.1.119.Final]
pulsar-broker     at java.base/java.lang.Thread.run(Unknown Source) [?:?]
pulsar-broker 2025-03-27T22:02:44,247+0000 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/output-partition-0-pulsar.repl.prod-repl] Rewind from 2111423:1 to 2111423
:0
pulsar-broker 2025-03-27T22:02:44,247+0000 [BookKeeperClientWorker-OrderedExecutor-2-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentReplicator - [persistent://public/default/output-partition-0 | prod-->prod-repl] Error producing on re
mote broker
pulsar-broker org.apache.pulsar.client.api.PulsarClientException$ProducerQueueIsFullError: Producer send queue is full
pulsar-broker     at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:1055) ~[io.streamnative-pulsar-client-original-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:534) ~[io.streamnative-pulsar-client-original-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator.replicateEntries(GeoPersistentReplicator.java:191) ~[io.streamnative-pulsar-broker-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.pulsar.broker.service.persistent.PersistentReplicator.readEntriesComplete(PersistentReplicator.java:313) ~[io.streamnative-pulsar-broker-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$checkReadCompletion$2(OpReadEntry.java:180) ~[io.streamnative-managed-ledger-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:128) ~[io.streamnative-bookkeeper-common-4.17.1.1.jar:4.17.1.1]
pulsar-broker     at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:99) ~[io.streamnative-bookkeeper-common-4.17.1.1.jar:4.17.1.1]
pulsar-broker     at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.119.Final.jar:4.1.119.Final]
pulsar-broker     at java.base/java.lang.Thread.run(Unknown Source) [?:?]

Modifications

  • merge multiple mechanisms into one and fix the issue

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Apr 21, 2025
@poorbarcode poorbarcode changed the title [fix][broker]excessive replication speed leads to error: Producer sen… [fix][broker]excessive replication speed leads to error: Producer send queue is full Apr 21, 2025
@poorbarcode poorbarcode self-assigned this Apr 21, 2025
@poorbarcode poorbarcode added this to the 4.1.0 milestone Apr 21, 2025
@poorbarcode poorbarcode added release/3.0.12 release/3.3.7 release/4.0.5 type/bug The PR fixed a bug or issue reported a bug labels Apr 21, 2025
@lhotari
Copy link
Member

lhotari commented Apr 22, 2025

@poorbarcode is there any relationship to PIP-269: Add an epoch of cursor to discard outdated reading or any other previous reported issues ?

@poorbarcode
Copy link
Contributor Author

@lhotari

@poorbarcode is there any relationship to #20469 or any other previous reported issues ?

It does not relate to

@poorbarcode
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

@codecov-commenter
Copy link

codecov-commenter commented May 21, 2025

Codecov Report

Attention: Patch coverage is 81.36364% with 41 lines in your changes missing coverage. Please review.

Project coverage is 74.26%. Comparing base (bbc6224) to head (880ba91).
Report is 1159 commits behind head on master.

Files with missing lines Patch % Lines
...roker/service/persistent/PersistentReplicator.java 85.71% 11 Missing and 13 partials ⚠️
...ar/broker/service/persistent/ShadowReplicator.java 22.22% 6 Missing and 1 partial ⚠️
...ache/pulsar/broker/service/AbstractReplicator.java 66.66% 2 Missing and 1 partial ⚠️
.../java/org/apache/pulsar/client/impl/ClientCnx.java 25.00% 2 Missing and 1 partial ⚠️
...er/service/persistent/GeoPersistentReplicator.java 84.61% 2 Missing ⚠️
...sar/broker/service/persistent/PersistentTopic.java 66.66% 2 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24189      +/-   ##
============================================
+ Coverage     73.57%   74.26%   +0.68%     
- Complexity    32624    32752     +128     
============================================
  Files          1877     1868       -9     
  Lines        139502   145542    +6040     
  Branches      15299    16660    +1361     
============================================
+ Hits         102638   108081    +5443     
- Misses        28908    28924      +16     
- Partials       7956     8537     +581     
Flag Coverage Δ
inttests 26.69% <31.36%> (+2.10%) ⬆️
systests 23.31% <1.36%> (-1.02%) ⬇️
unittests 73.77% <81.36%> (+0.92%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...a/org/apache/pulsar/broker/service/Replicator.java 0.00% <ø> (ø)
...service/nonpersistent/NonPersistentReplicator.java 67.00% <100.00%> (+3.63%) ⬆️
...roker/service/persistent/MessageDeduplication.java 76.83% <100.00%> (-4.08%) ⬇️
...pulsar/client/impl/GeoReplicationProducerImpl.java 49.13% <100.00%> (ø)
...va/org/apache/pulsar/client/impl/ProducerImpl.java 83.87% <100.00%> (+0.27%) ⬆️
...er/service/persistent/GeoPersistentReplicator.java 71.65% <84.61%> (-6.37%) ⬇️
...sar/broker/service/persistent/PersistentTopic.java 80.21% <66.66%> (+1.76%) ⬆️
...ache/pulsar/broker/service/AbstractReplicator.java 67.64% <66.66%> (-17.36%) ⬇️
.../java/org/apache/pulsar/client/impl/ClientCnx.java 69.65% <25.00%> (-2.12%) ⬇️
...ar/broker/service/persistent/ShadowReplicator.java 45.90% <22.22%> (-12.64%) ⬇️
... and 1 more

... and 1076 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@lhotari
Copy link
Member

lhotari commented May 30, 2025

test failure:

 Error:  Tests run: 8, Failures: 1, Errors: 0, Skipped: 7, Time elapsed: 65.679 s <<< FAILURE! - in org.apache.pulsar.broker.service.DisabledCreateTopicToRemoteClusterForReplicationTest
  Error:  org.apache.pulsar.broker.service.DisabledCreateTopicToRemoteClusterForReplicationTest.testCreatePartitionedTopicWithNsReplication  Time elapsed: 20.673 s  <<< FAILURE!
  java.lang.NullPointerException: Cannot invoke "org.apache.pulsar.client.api.Message.getValue()" because the return value of "org.apache.pulsar.client.api.Consumer.receive(int, java.util.concurrent.TimeUnit)" is null
  	at org.apache.pulsar.broker.service.DisabledCreateTopicToRemoteClusterForReplicationTest.testCreatePartitionedTopicWithNsReplication(DisabledCreateTopicToRemoteClusterForReplicationTest.java:111)
  	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
  	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
  	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
  	at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
  	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
  	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
  	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
  	at java.base/java.lang.Thread.run(Thread.java:1583)

could be a flaky test, attempt to fix was #24141

@lhotari
Copy link
Member

lhotari commented May 30, 2025

there seems to be more flakiness in tests. I can see the first attempt failed in a replication test as well

  Error:  Tests run: 140, Failures: 1, Errors: 0, Skipped: 116, Time elapsed: 343.104 s <<< FAILURE! - in org.apache.pulsar.broker.service.ReplicatorTest
  Error:  org.apache.pulsar.broker.service.ReplicatorTest.testReplicationWillNotStuckByIncompleteSchemaFuture  Time elapsed: 16.339 s  <<< FAILURE!
  org.awaitility.core.ConditionTimeoutException: Assertion condition replication task finished expected [true] but found [false] within 10 seconds.
  	at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167)
  	at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:119)
  	at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:31)
  	at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:985)
  	at org.awaitility.core.ConditionFactory.untilAsserted(ConditionFactory.java:769)
  	at org.apache.pulsar.broker.service.ReplicatorTest.waitReplicateFinish(ReplicatorTest.java:523)
  	at org.apache.pulsar.broker.service.ReplicatorTest.testReplicationWillNotStuckByIncompleteSchemaFuture(ReplicatorTest.java:516)
  	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
  	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
  	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
  	at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
  	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
  	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
  	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
  	at java.base/java.lang.Thread.run(Thread.java:1583)
  Caused by: java.lang.AssertionError: replication task finished expected [true] but found [false]
  	at org.testng.Assert.fail(Assert.java:110)
  	at org.testng.Assert.failNotEquals(Assert.java:1577)
  	at org.testng.Assert.assertTrue(Assert.java:56)
  	at org.apache.pulsar.broker.service.ReplicatorTest.lambda$waitReplicateFinish$5(ReplicatorTest.java:526)
  	at org.awaitility.core.AssertionCondition.lambda$new$0(AssertionCondition.java:53)
  	at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:248)
  	at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:235)
  	... 4 more

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please debug the flaky test failures in replication tests before we merge this change.

@lhotari
Copy link
Member

lhotari commented May 30, 2025

/pulsarbot rerun-failure-checks

@poorbarcode poorbarcode requested a review from lhotari June 4, 2025 01:50
@poorbarcode poorbarcode force-pushed the fix/replication_queue_full branch from 7a565f2 to 880ba91 Compare June 23, 2025 01:36
@poorbarcode poorbarcode dismissed lhotari’s stale review July 2, 2025 06:09

He has un-replied for a long time

@poorbarcode
Copy link
Contributor Author

Dismissed @lhotari's request change because he did not reply for a long time

@poorbarcode poorbarcode merged commit 37b17d3 into apache:master Jul 2, 2025
101 of 104 checks passed
poorbarcode added a commit that referenced this pull request Jul 7, 2025
…d queue is full (#24189)

Co-authored-by: Kai Wang <[email protected]>
(cherry picked from commit 37b17d3)
poorbarcode added a commit that referenced this pull request Jul 7, 2025
…d queue is full (#24189)

Co-authored-by: Kai Wang <[email protected]>
(cherry picked from commit 37b17d3)
poorbarcode added a commit that referenced this pull request Jul 7, 2025
…d queue is full (#24189)

Co-authored-by: Kai Wang <[email protected]>
(cherry picked from commit 37b17d3)
priyanshu-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 22, 2025
…d queue is full (apache#24189)

Co-authored-by: Kai Wang <[email protected]>
(cherry picked from commit 37b17d3)
(cherry picked from commit 1d49396)
priyanshu-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 22, 2025
…d queue is full (apache#24189)

Co-authored-by: Kai Wang <[email protected]>
(cherry picked from commit 37b17d3)
(cherry picked from commit 96be3bb)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 23, 2025
…d queue is full (apache#24189)

Co-authored-by: Kai Wang <[email protected]>
(cherry picked from commit 37b17d3)
(cherry picked from commit 1d49396)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 24, 2025
…d queue is full (apache#24189)

Co-authored-by: Kai Wang <[email protected]>
(cherry picked from commit 37b17d3)
(cherry picked from commit 96be3bb)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants